#pragma once

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <deque>
#include <fmt/core.h>
#include <folly/Executor.h>
#include <folly/Likely.h>
#include <folly/Range.h>
#include <folly/Synchronized.h>
#include <folly/ThreadLocal.h>
#include <folly/Utility.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BoundedQueue.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/Promise.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Timeout.h>
#include <folly/fibers/BatchSemaphore.h>
#include <folly/io/async/Request.h>
#include <folly/logging/xlog.h>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <string_view>
#include <utility>

#include "common/kv/IKVEngine.h"
#include "common/monitor/Recorder.h"
#include "common/utils/Coroutine.h"
#include "common/utils/FaultInjection.h"
#include "common/utils/IdAllocator.h"
#include "common/utils/Result.h"
#include "fbs/meta/Common.h"
#include "fdb/FDBRetryStrategy.h"

namespace hf3fs::meta::server {

/**
 * Generate InodeId range from 0x00000000_00001000 to 0x01ffffff_ffffffff.
 *
 * Generated InodeId format: [ high 52bits: generated by IdAllocator ][ low 12 bits: local generated ].
 * InodeIdAllocator first use IdAllocator to generate a 52bits value, then left shift 12 bits and generate lower 12 bits
 * locally. So it only need to access the FoundationDB after generate 4096 InodeIds.
 */
class InodeIdAllocator : public std::enable_shared_from_this<InodeIdAllocator> {
  // These values are used in FoundationDB
  static std::string kAllocatorKeyPrefix;
  static constexpr size_t kAllocatorShard = 32;    // avoid txn conflictation
  static constexpr uint64_t kAllocatorShift = 12;  // shift 12 bit
  static constexpr uint64_t kAllocatorBit =
      64 - kAllocatorShift;  // IdAllocator generated values only have 52bits valid.
  static constexpr uint64_t kAllocatorMask = (1ULL << kAllocatorBit) - 1;
  static constexpr uint64_t kAllocateBatch = 1 << kAllocatorShift;

  struct Tag {};

 public:
  InodeIdAllocator(Tag, std::shared_ptr<kv::IKVEngine> kvEngine)
      : engine_(std::move(kvEngine)),
        allocator_(*engine_, createRetryStrategy(), kAllocatorKeyPrefix, kAllocatorShard),
        allocating_(false),
        queue_(2 * kAllocateBatch) {}

  static std::shared_ptr<InodeIdAllocator> create(std::shared_ptr<kv::IKVEngine> kvEngine) {
    return std::make_shared<InodeIdAllocator>(Tag{}, std::move(kvEngine));
  }

  CoTryTask<InodeId> allocate(std::chrono::microseconds timeout = std::chrono::seconds(2)) {
    static monitor::CountRecorder failed("meta_inodeid_alloc_failed");

    auto id = queue_.try_dequeue();
    if (LIKELY(id.has_value())) {
      if (queue_.size() < kAllocateBatch / 2) {
        tryStartAllocateTask(co_await folly::coro::co_current_executor);
      }
      co_return id.value();
    }
    auto result = co_await allocateSlow(timeout);
    if (result.hasError()) {
      failed.addSample(1);
      co_return result;
    }
    if (result->u64() >= InodeId::kNewChunkEngineMask) {
      failed.addSample(1);
      XLOGF(DFATAL, "InodeId {} is larger than", *result, InodeId(InodeId::kNewChunkEngineMask));
      co_return makeError(MetaCode::kInodeIdAllocFailed, "InodeId too large, shouldn't happen");
    }
    co_return result;
  }

 private:
  static kv::FDBRetryStrategy createRetryStrategy() { return kv::FDBRetryStrategy({.retryMaybeCommitted = true}); }

  static CoTask<void> allocateTask(std::weak_ptr<InodeIdAllocator> weak,
                                   std::optional<folly::Duration> delay = std::nullopt) {
    if (delay.has_value()) {
      co_await folly::coro::sleep(delay.value());
    }

    auto ptr = weak.lock();
    if (ptr) {
      co_await ptr->allocateFromDB();
    }
    co_return;
  }

  void tryStartAllocateTask(folly::Executor *exec) {
    if (!allocating_.exchange(true)) {
      startAllocateTask(exec);
    }
  }

  void startAllocateTask(folly::Executor *exec) {
    folly::RequestContextScopeGuard guard;
    allocateTask(weak_from_this()).scheduleOn(exec).start();
  }

  CoTryTask<InodeId> allocateSlow(std::chrono::microseconds timeout);
  CoTask<void> allocateFromDB();

  std::shared_ptr<kv::IKVEngine> engine_;
  IdAllocator<kv::FDBRetryStrategy> allocator_;
  std::atomic<bool> allocating_;
  folly::coro::BoundedQueue<InodeId, false, false> queue_;
};

}  // namespace hf3fs::meta::server
